home *** CD-ROM | disk | FTP | other *** search
/ PCGUIA 127 / PC Guia 127.iso / Software / Produtividade / OpenOffice.org 2.0.1 / openofficeorg4.cab / test_socketserver.py < prev    next >
Text File  |  2005-11-19  |  5KB  |  181 lines

  1. # Test suite for SocketServer.py
  2.  
  3. from test import test_support
  4. from test.test_support import verbose, verify, TESTFN, TestSkipped
  5. test_support.requires('network')
  6.  
  7. from SocketServer import *
  8. import socket
  9. import select
  10. import time
  11. import threading
  12. import os
  13.  
  14. NREQ = 3
  15. DELAY = 0.5
  16.  
  17. class MyMixinHandler:
  18.     def handle(self):
  19.         time.sleep(DELAY)
  20.         line = self.rfile.readline()
  21.         time.sleep(DELAY)
  22.         self.wfile.write(line)
  23.  
  24. class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
  25.     pass
  26.  
  27. class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
  28.     pass
  29.  
  30. class MyMixinServer:
  31.     def serve_a_few(self):
  32.         for i in range(NREQ):
  33.             self.handle_request()
  34.     def handle_error(self, request, client_address):
  35.         self.close_request(request)
  36.         self.server_close()
  37.         raise
  38.  
  39. teststring = "hello world\n"
  40.  
  41. def receive(sock, n, timeout=20):
  42.     r, w, x = select.select([sock], [], [], timeout)
  43.     if sock in r:
  44.         return sock.recv(n)
  45.     else:
  46.         raise RuntimeError, "timed out on %s" % `sock`
  47.  
  48. def testdgram(proto, addr):
  49.     s = socket.socket(proto, socket.SOCK_DGRAM)
  50.     s.sendto(teststring, addr)
  51.     buf = data = receive(s, 100)
  52.     while data and '\n' not in buf:
  53.         data = receive(s, 100)
  54.         buf += data
  55.     verify(buf == teststring)
  56.     s.close()
  57.  
  58. def teststream(proto, addr):
  59.     s = socket.socket(proto, socket.SOCK_STREAM)
  60.     s.connect(addr)
  61.     s.sendall(teststring)
  62.     buf = data = receive(s, 100)
  63.     while data and '\n' not in buf:
  64.         data = receive(s, 100)
  65.         buf += data
  66.     verify(buf == teststring)
  67.     s.close()
  68.  
  69. class ServerThread(threading.Thread):
  70.     def __init__(self, addr, svrcls, hdlrcls):
  71.         threading.Thread.__init__(self)
  72.         self.__addr = addr
  73.         self.__svrcls = svrcls
  74.         self.__hdlrcls = hdlrcls
  75.     def run(self):
  76.         class svrcls(MyMixinServer, self.__svrcls):
  77.             pass
  78.         if verbose: print "thread: creating server"
  79.         svr = svrcls(self.__addr, self.__hdlrcls)
  80.         if verbose: print "thread: serving three times"
  81.         svr.serve_a_few()
  82.         if verbose: print "thread: done"
  83.  
  84. seed = 0
  85. def pickport():
  86.     global seed
  87.     seed += 1
  88.     return 10000 + (os.getpid() % 1000)*10 + seed
  89.  
  90. host = "localhost"
  91. testfiles = []
  92. def pickaddr(proto):
  93.     if proto == socket.AF_INET:
  94.         return (host, pickport())
  95.     else:
  96.         fn = TESTFN + str(pickport())
  97.         if os.name == 'os2':
  98.             # AF_UNIX socket names on OS/2 require a specific prefix
  99.             # which can't include a drive letter and must also use
  100.             # backslashes as directory separators
  101.             if fn[1] == ':':
  102.                 fn = fn[2:]
  103.             if fn[0] in (os.sep, os.altsep):
  104.                 fn = fn[1:]
  105.             fn = os.path.join('\socket', fn)
  106.             if os.sep == '/':
  107.                 fn = fn.replace(os.sep, os.altsep)
  108.             else:
  109.                 fn = fn.replace(os.altsep, os.sep)
  110.         testfiles.append(fn)
  111.         return fn
  112.  
  113. def cleanup():
  114.     for fn in testfiles:
  115.         try:
  116.             os.remove(fn)
  117.         except os.error:
  118.             pass
  119.     testfiles[:] = []
  120.  
  121. def testloop(proto, servers, hdlrcls, testfunc):
  122.     for svrcls in servers:
  123.         addr = pickaddr(proto)
  124.         if verbose:
  125.             print "ADDR =", addr
  126.             print "CLASS =", svrcls
  127.         t = ServerThread(addr, svrcls, hdlrcls)
  128.         if verbose: print "server created"
  129.         t.start()
  130.         if verbose: print "server running"
  131.         for i in range(NREQ):
  132.             time.sleep(DELAY)
  133.             if verbose: print "test client", i
  134.             testfunc(proto, addr)
  135.         if verbose: print "waiting for server"
  136.         t.join()
  137.         if verbose: print "done"
  138.  
  139. tcpservers = [TCPServer, ThreadingTCPServer]
  140. if hasattr(os, 'fork') and os.name not in ('os2',):
  141.     tcpservers.append(ForkingTCPServer)
  142. udpservers = [UDPServer, ThreadingUDPServer]
  143. if hasattr(os, 'fork') and os.name not in ('os2',):
  144.     udpservers.append(ForkingUDPServer)
  145.  
  146. if not hasattr(socket, 'AF_UNIX'):
  147.     streamservers = []
  148.     dgramservers = []
  149. else:
  150.     class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
  151.     streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
  152.     if hasattr(os, 'fork') and os.name not in ('os2',):
  153.         streamservers.append(ForkingUnixStreamServer)
  154.     class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
  155.     dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
  156.     if hasattr(os, 'fork') and os.name not in ('os2',):
  157.         dgramservers.append(ForkingUnixDatagramServer)
  158.  
  159. def testall():
  160.     testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
  161.     testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
  162.     if hasattr(socket, 'AF_UNIX'):
  163.         testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
  164.         # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
  165.         # client address so this cannot work:
  166.         ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
  167.  
  168. def test_main():
  169.     import imp
  170.     if imp.lock_held():
  171.         # If the import lock is held, the threads will hang.
  172.         raise TestSkipped("can't run when import lock is held")
  173.  
  174.     try:
  175.         testall()
  176.     finally:
  177.         cleanup()
  178.  
  179. if __name__ == "__main__":
  180.     test_main()
  181.